Index: build.xml
===================================================================
--- build.xml	(revision 921731)
+++ build.xml	(working copy)
@@ -544,6 +544,7 @@
       <fileset file="${conf.dir}/commons-logging.properties"/>
       <fileset file="${conf.dir}/log4j.properties"/>
       <fileset file="${conf.dir}/hadoop-metrics.properties"/>
+      <fileset file="${conf.dir}/hadoop-performance-analysis.properties"/>
       <zipfileset dir="${build.webapps}" prefix="webapps"/>
     </jar>
   </target>
Index: conf/hadoop-performance-analysis.properties
===================================================================
--- conf/hadoop-performance-analysis.properties	(revision 0)
+++ conf/hadoop-performance-analysis.properties	(revision 0)
@@ -0,0 +1,20 @@
+# maximum swap memory in KB that can be used
+swap.memory.threshold=10240
+
+#maximum percentage for which can be 
+idle.cpu.max.percent.threshold=15
+
+#percentage of files that can be smaller than current split size
+file.size.percent.threshold=50.0
+
+#minimum map task duration in seconds
+average.map.task.duration.seconds=30
+
+#maximum map iterations
+map.iterations=10.0
+
+#spill factor
+num.spill.factor=3.0
+
+#percent load on reducers in partitioning
+reduce.load.percent=80
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties	(revision 921731)
+++ ivy/libraries.properties	(working copy)
@@ -20,7 +20,7 @@
 apacheant.version=1.7.0
 
 checkstyle.version=4.2
-
+java5.home=C:\"Program Files"\Java\jre1.5.0_06
 commons-cli.version=1.2
 commons-codec.version=1.3
 commons-collections.version=3.1
Index: src/core/org/apache/hadoop/util/Shell.java
===================================================================
--- src/core/org/apache/hadoop/util/Shell.java	(revision 921731)
+++ src/core/org/apache/hadoop/util/Shell.java	(working copy)
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -138,7 +139,6 @@
   private void runCommand() throws IOException { 
     ProcessBuilder builder = new ProcessBuilder(getExecString());
     boolean completed = false;
-    
     if (environment != null) {
       builder.environment().putAll(this.environment);
     }
@@ -184,6 +184,7 @@
       }
       // wait for the process to finish and check the exit code
       exitCode = process.waitFor();
+      
       try {
         // make sure that the error thread exits
         errThread.join();
Index: src/mapred/org/apache/hadoop/mapred/JobHistory.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobHistory.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/JobHistory.java	(working copy)
@@ -122,7 +122,9 @@
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-    TRACKER_NAME, STATE_STRING, VERSION
+    TRACKER_NAME, STATE_STRING, VERSION, IDLE_CPU_PER_SUM_MAP, IDLE_CPU_PER_SUM_REDUCE,
+    TOTAL_HEARTBEAT_MAP, TOTAL_HEARTBEAT_REDUCE, MIN_FREE_MEM_MAP, MIN_FREE_MEM_REDUCE,
+    MAX_SWAP_MEM_MAP, MAX_SWAP_MEM_REDUCE, TOTAL_CPU_CORES, MAP_COUNTERS, REDUCE_COUNTERS
   }
 
   /**
@@ -1235,6 +1237,49 @@
         }
       }
     }
+
+	public static void logPerformanceParameters(
+			JobID jobId,
+			double idleCPUPercentageSumWhileMap,
+			double idleCPUPercentageSumWhileReduce,
+			long totalHeartbeatsWhileMap, long totalHeartbeatsWhileReduce,
+			long maximumSwapMemoryUsedInKBWhileMap,
+			long maximumSwapMemoryUsedInKBWhileReduce,
+			long minimumFreeMemoryInKBWhileMap,
+			long minimumFreeMemoryInKBWhileReduce,
+			long totalCPUCoresInCluster) {
+		if (!disableHistory){
+	        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+	        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+	        if (null != writer){
+	          JobHistory.log(writer, RecordTypes.Job, 
+	              new Keys[] {Keys.IDLE_CPU_PER_SUM_MAP, Keys.IDLE_CPU_PER_SUM_REDUCE, Keys.TOTAL_HEARTBEAT_MAP, 
+	                          Keys.TOTAL_HEARTBEAT_REDUCE, Keys.MIN_FREE_MEM_MAP, Keys.MIN_FREE_MEM_REDUCE, 
+	                          Keys.MAX_SWAP_MEM_MAP, Keys.MAX_SWAP_MEM_REDUCE, Keys.TOTAL_CPU_CORES},
+	              new String[] {String.valueOf(idleCPUPercentageSumWhileMap), String.valueOf(idleCPUPercentageSumWhileReduce), 
+	                            String.valueOf(totalHeartbeatsWhileMap), String.valueOf(totalHeartbeatsWhileReduce), 
+	                            String.valueOf(minimumFreeMemoryInKBWhileMap), String.valueOf(minimumFreeMemoryInKBWhileReduce), 
+	                            String.valueOf(maximumSwapMemoryUsedInKBWhileMap), String.valueOf(maximumSwapMemoryUsedInKBWhileReduce),
+	                            String.valueOf(totalCPUCoresInCluster)}); 
+	        }
+	      }
+		
+	}
+
+	public static void logCounters(JobID jobId, Counters mapCounters,
+			Counters reduceCounters) {
+		if (!disableHistory){
+	        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+	        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+	        if (null != writer){
+	          JobHistory.log(writer, RecordTypes.Job, 
+	              new Keys[] {Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS},
+	              new String[] {mapCounters.makeEscapedCompactString(), reduceCounters.makeEscapedCompactString()}); 
+	        }
+	      }
+	}	
   }
   
   /**
Index: src/mapred/org/apache/hadoop/mapred/JobInProgress.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/JobInProgress.java	(working copy)
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -99,6 +100,15 @@
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
+  double IdleCPUPercentageSumWhileMap = 0;
+  long totalHeartbeatsWhileMap = 0;
+  double IdleCPUPercentageSumWhileReduce = 0;
+  long totalHeartbeatsWhileReduce = 0;
+  long minimumFreeMemoryInKBWhileMap = Long.MAX_VALUE;
+  long minimumFreeMemoryInKBWhileReduce = Long.MAX_VALUE;
+  long maximumSwapMemoryUsedInKBWhileMap = 0;
+  long maximumSwapMemoryUsedInKBWhileReduce = 0;
+  boolean enablePerformanceDiagnosis;
   private volatile boolean launchedCleanup = false;
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
@@ -130,6 +140,14 @@
   
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
+  
+  private Map<String,Float> cpuIdlePercentageMap = new HashMap();
+  
+  private Map<String,Long> freeMemoryMap = new HashMap();
+  
+  private Map<String,Long> swapMemoryMap = new HashMap();
+  
+  private Map<String,Long> cacheMemoryMap = new HashMap();
 
   private final int maxLevel;
 
@@ -2108,10 +2126,16 @@
       this.finishTime = System.currentTimeMillis();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
+      JobHistory.JobInfo.logPerformanceParameters(this.status.getJobID(),IdleCPUPercentageSumWhileMap,IdleCPUPercentageSumWhileReduce,
+				  totalHeartbeatsWhileMap, totalHeartbeatsWhileReduce,
+				  maximumSwapMemoryUsedInKBWhileMap, maximumSwapMemoryUsedInKBWhileReduce,
+				  minimumFreeMemoryInKBWhileMap, minimumFreeMemoryInKBWhileReduce, jobtracker.getTotalCPUCoresInCluster());
+      JobHistory.JobInfo.logCounters(this.status.getJobID(),getMapCounters(),getReduceCounters());
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
                                      failedReduceTasks, getCounters());
+      
       // Note that finalize will close the job history handles which garbage collect
       // might try to finalize
       garbageCollect();
@@ -2655,4 +2679,78 @@
       return Values.REDUCE.name();
     }
   }
+
+public Map<String, Float> getCpuIdlePercentageMap() {
+	return cpuIdlePercentageMap;
 }
+
+public Map<String, Long> getFreeMemoryMap() {
+	return freeMemoryMap;
+}
+
+public Map<String, Long> getSwapMemoryMap() {
+	return swapMemoryMap;
+}
+
+public Map<String, Long> getCacheMemoryMap() {
+	return cacheMemoryMap;
+}
+
+public double getIdleCPUPercentageSumWhileMap() {
+	return IdleCPUPercentageSumWhileMap;
+}
+
+public void setIdleCPUPercentageSumWhileMap(double idleCPUPercentageSumWhileMap) {
+	IdleCPUPercentageSumWhileMap = idleCPUPercentageSumWhileMap;
+}
+
+public long getTotalHeartbeatsWhileMap() {
+	return totalHeartbeatsWhileMap;
+}
+
+public void setTotalHeartbeatsWhileMap(long totalHeartbeatsWhileMap) {
+	this.totalHeartbeatsWhileMap = totalHeartbeatsWhileMap;
+}
+
+public double getIdleCPUPercentageSumWhileReduce() {
+	return IdleCPUPercentageSumWhileReduce;
+}
+
+public void setIdleCPUPercentageSumWhileReduce(
+		double idleCPUPercentageSumWhileReduce) {
+	IdleCPUPercentageSumWhileReduce = idleCPUPercentageSumWhileReduce;
+}
+
+public long getTotalHeartbeatsWhileReduce() {
+	return totalHeartbeatsWhileReduce;
+}
+
+public void setTotalHeartbeatsWhileReduce(long totalHeartbeatsWhileReduce) {
+	this.totalHeartbeatsWhileReduce = totalHeartbeatsWhileReduce;
+}
+
+public long getMinimumFreeMemoryInKBWhileMap() {
+	return minimumFreeMemoryInKBWhileMap;
+}
+
+public long getMinimumFreeMemoryInKBWhileReduce() {
+	return minimumFreeMemoryInKBWhileReduce;
+}
+
+public long getMaximumSwapMemoryUsedInKBWhileMap() {
+	return maximumSwapMemoryUsedInKBWhileMap;
+}
+
+public long getMaximumSwapMemoryUsedInKBWhileReduce() {
+	return maximumSwapMemoryUsedInKBWhileReduce;
+}
+
+public boolean isEnablePerformanceDiagnosis() {
+	return enablePerformanceDiagnosis;
+}
+
+public void setEnablePerformanceDiagnosis(boolean enablePerformanceDiagnosis) {
+	this.enablePerformanceDiagnosis = enablePerformanceDiagnosis;
+}
+
+}
Index: src/mapred/org/apache/hadoop/mapred/JobTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/JobTracker.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/JobTracker.java	(working copy)
@@ -121,7 +121,13 @@
   private final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
-
+  
+  // Map of No of cpu cores of each tasktracker
+  private Map<String,Integer> tasktrackerCPUCores = new HashMap();
+  // Map of Total RAM memory of each tasktracker 
+  private Map<String,Long> tasktrackerTotalMemory = new HashMap();
+  // Swap Usage by each machine when tasktracker starts
+  private Map<String,Long> tasktrackerInitialSwapUsage = new HashMap();
   // system directories are world-wide readable and owner readable
   final static FsPermission SYSTEM_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0733); // rwx-wx-wx
@@ -1527,6 +1533,8 @@
   long limitMaxMemForReduceTasks;
   long memSizeForMapSlotOnJT;
   long memSizeForReduceSlotOnJT;
+  
+  private boolean enablePerformanceDiagnosis;
 
   private QueueManager queueManager;
 
@@ -1537,7 +1545,11 @@
     this(conf, generateNewIdentifier());
   }
   
-  JobTracker(JobConf conf, String identifier) 
+  public boolean isEnablePerformanceDiagnosis() {
+	return enablePerformanceDiagnosis;
+}
+
+JobTracker(JobConf conf, String identifier) 
   throws IOException, InterruptedException {   
     //
     // Grab some static constants
@@ -1560,7 +1572,7 @@
     // on startup, and can delete any files that we're done with
     this.conf = conf;
     JobConf jobConf = new JobConf(conf);
-
+    enablePerformanceDiagnosis = conf.getBoolean("mapred.performance.diagnose", false);
     initializeTaskMemoryRelatedConfig();
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
@@ -2503,7 +2515,13 @@
     long now = System.currentTimeMillis();
     boolean isBlacklisted = false;
     if (restarted) {
-      faultyTrackers.markTrackerHealthy(status.getHost());
+    	//Save tasktracker info when it starts and if performance diagnosis enabled
+    	if(enablePerformanceDiagnosis) {
+    		tasktrackerCPUCores.put(trackerName, status.getNoOfCPUCores());
+        	tasktrackerTotalMemory.put(trackerName, status.getTotalRAM());
+        	tasktrackerInitialSwapUsage.put(trackerName, status.getSwapMemoryUsedInKB());
+    	}
+    	faultyTrackers.markTrackerHealthy(status.getHost());
     } else {
       isBlacklisted = 
         faultyTrackers.shouldAssignTasksToTracker(status.getHost(), now);
@@ -2549,7 +2567,55 @@
         }
       }
     }
-      
+    
+    // Save all the statistics if the performance diagnosis enabled
+    if(enablePerformanceDiagnosis){
+    	synchronized (jobs) {
+    		for (Iterator<JobInProgress> iterator = jobs.values().iterator(); iterator.hasNext();) {
+    			
+    			JobInProgress job = iterator.next();
+    			long freeMemory = status.getFreeMemoryInKB() + status.getCacheMemoryInKB();
+    			long swapMemory = status.getSwapMemoryUsedInKB() - tasktrackerInitialSwapUsage.get(trackerName);
+    			if(job.getStatus().getRunState() == JobStatus.RUNNING){
+    				//for map phase
+    				if(job.getStatus().mapProgress() < 1.0) {
+    					job.IdleCPUPercentageSumWhileMap += status.getCpuIdlePercentage();
+    					job.totalHeartbeatsWhileMap += 1;
+    					if(job.minimumFreeMemoryInKBWhileMap > freeMemory) {
+    						job.minimumFreeMemoryInKBWhileMap = freeMemory;
+    					}
+    					if(job.maximumSwapMemoryUsedInKBWhileMap < swapMemory) {
+    						job.maximumSwapMemoryUsedInKBWhileMap = swapMemory;
+    					}
+    				}else {
+    					//for reduce phase
+    					job.IdleCPUPercentageSumWhileReduce += status.getCpuIdlePercentage();
+    					job.totalHeartbeatsWhileReduce += 1;
+    					
+    					if(job.minimumFreeMemoryInKBWhileReduce > freeMemory) {
+    						job.minimumFreeMemoryInKBWhileReduce = freeMemory;
+    					}
+    					if(job.maximumSwapMemoryUsedInKBWhileReduce < swapMemory) {
+    						job.maximumSwapMemoryUsedInKBWhileReduce = swapMemory;
+    					}
+    				}
+    				job.getCpuIdlePercentageMap().put(trackerName,status.getCpuIdlePercentage());
+    				
+    				// free memory
+    				job.getFreeMemoryMap().put(trackerName,status.getFreeMemoryInKB());
+    				
+    				//swap memory
+    				job.getSwapMemoryMap().put(trackerName,swapMemory);
+    				
+    				// cache memory
+    				job.getCacheMemoryMap().put(trackerName, status.getCacheMemoryInKB());
+    				
+    			}	
+    		}
+    	}
+    }
+    
+    
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
     status.setLastSeen(now);
@@ -2991,7 +3057,7 @@
     }
     
     JobInProgress job = new JobInProgress(jobId, this, this.conf);
-    
+    job.setEnablePerformanceDiagnosis(enablePerformanceDiagnosis);
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
       new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
@@ -3864,4 +3930,45 @@
       throw new IOException(jobStr.toString() + msg);
     }
   }
+  
+  /**
+   * Gets the total cpu cores in cluster.
+   * 
+   * @return the total cpu cores in cluster
+   */
+  public int getTotalCPUCoresInCluster() {
+	  int totalCPUCores = 0;
+	  for (Iterator<Integer> iterator = tasktrackerCPUCores.values().iterator(); iterator.hasNext();) {
+		totalCPUCores += iterator.next();
+	  }
+	  return totalCPUCores; 
+  }
+
+	/**
+	 * Gets the tasktracker cpu cores.
+	 * 
+	 * @return the tasktracker cpu cores
+	 */
+	public Map<String, Integer> getTasktrackerCPUCores() {
+		return tasktrackerCPUCores;
+	}
+	
+	/**
+	 * Gets the tasktracker total memory.
+	 * 
+	 * @return the tasktracker total memory
+	 */
+	public Map<String, Long> getTasktrackerTotalMemory() {
+		return tasktrackerTotalMemory;
+	}
+
+	/**
+	 * Gets the tasktracker machine's initial swap usage.
+	 * 
+	 * @return the tasktracker machine's initial swap usage
+	 */
+	public Map<String, Long> getTasktrackerInitialSwapUsage() {
+		return tasktrackerInitialSwapUsage;
+	}
+  
 }
Index: src/mapred/org/apache/hadoop/mapred/MapTask.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/MapTask.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/MapTask.java	(working copy)
@@ -24,6 +24,7 @@
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.NUMBER_OF_SPILLS;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -740,6 +741,7 @@
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineOutputCounter;
+    private final Counters.Counter spillCounter;
     
     private ArrayList<SpillRecord> indexCacheList;
     private int totalIndexCacheMemory;
@@ -799,6 +801,7 @@
       // counters
       mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
+      spillCounter = reporter.getCounter(NUMBER_OF_SPILLS);
       Counters.Counter combineInputCounter = 
         reporter.getCounter(COMBINE_INPUT_RECORDS);
       combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
@@ -1294,6 +1297,7 @@
         }
         LOG.info("Finished spill " + numSpills);
         ++numSpills;
+        spillCounter.increment(1);
       } finally {
         if (out != null) out.close();
       }
Index: src/mapred/org/apache/hadoop/mapred/PerformanceAnalyzer.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/PerformanceAnalyzer.java	(revision 0)
+++ src/mapred/org/apache/hadoop/mapred/PerformanceAnalyzer.java	(revision 0)
@@ -0,0 +1,574 @@
+package org.apache.hadoop.mapred;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.JobHistory.Keys;
+import org.apache.hadoop.mapred.JobHistory.TaskAttempt;
+import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.util.StringUtils;
+
+public class PerformanceAnalyzer {
+	public static Properties properties=null;
+	
+	static class SuggestionMessage implements Comparable<SuggestionMessage>{
+		private String message;
+		private String colorCode;
+		private String property;
+		private Integer displayPriority;
+		
+		public int getDisplayPriority() {
+			return displayPriority;
+		}
+		public void setDisplayPriority(int displayPriority) {
+			this.displayPriority = displayPriority;
+		}
+		public String getProperty() {
+			return property;
+		}
+		public void setProperty(String property) {
+			this.property = property;
+		}
+		public String getMessage() {
+			return message;
+		}
+		public void setMessage(String message) {
+			this.message = message;
+		}
+		public String getColorCode() {
+			return colorCode;
+		}
+		public void setColorCode(String colorCode) {
+			this.colorCode = colorCode;
+			if(colorCode.equals("red")){
+				this.displayPriority = 1;
+			}else if(colorCode.equals("yellow")){
+				this.displayPriority = 2;
+			}else {
+				this.displayPriority = 3;
+			}
+		}
+		@Override
+		public int compareTo(SuggestionMessage o) {
+			return this.displayPriority.compareTo(o.displayPriority);
+		}
+	}
+	
+	/**
+	 * loads the property file which contains threshold values for various properties.
+	 * 
+	 * @throws IOException Signals that an I/O exception has occurred.
+	 */
+	public void setProperties() throws IOException {
+		String propertiesFile="/hadoop-performance-analysis.properties";
+	    InputStream is = getClass().getResourceAsStream(propertiesFile);
+	    if (is != null) {
+	      properties = new Properties();
+	      properties.load(is);
+	    }
+	  }
+	
+	/**
+	 * Gets the average file size of input data for this job.
+	 * 
+	 * @param conf configuration object of this job
+	 * 
+	 * @return the average file size in MB's
+	 * 
+	 * @throws IOException Signals that an I/O exception has occurred.
+	 */
+	public static double getAverageFileSize(Configuration conf) throws IOException{
+		FileSystem fs = FileSystem.get(conf);
+		FileStatus[] files = fs.listStatus(getInputPaths(conf));
+		//if there is no input or the input data is deleted after the job completion 
+		if(files.length == 0){
+			return 0;
+		}
+		double totalDataSize=0;
+		for (int i = 0; i < files.length; i++) {
+			totalDataSize += (double)files[i].getLen()/(1024*1024);
+		}
+		return totalDataSize/files.length;
+	}
+	
+	/**
+	 * Gets the percentage of files smaller than current split size.
+	 * 
+	 * @param conf the configuration object
+	 * 
+	 * @return percentage of files smaller than current split size.
+	 * 
+	 * @throws IOException Signals that an I/O exception has occurred.
+	 */
+	public static float getPercentFileSmallerThanSplitSize(Configuration conf) throws IOException{
+		FileSystem fs = FileSystem.get(conf);
+		FileStatus[] files = fs.listStatus(getInputPaths(conf));
+		//if there is no input or the input data is deleted after the job completion 
+		if(files.length == 0) {
+			return -1;
+		}
+		long splitSize = getCurrentSplitSize(conf);
+		long count = 0;
+		for (int i = 0; i < files.length; i++) {
+			if(files[i].getLen() < splitSize) {
+				count++;
+			}
+		}
+		float percentFilesSmaller = ((float)count/files.length)*100; 
+		return percentFilesSmaller;
+	}
+	
+	/**
+	 * Gets the Path array out of input path string.
+	 * 
+	 * @param conf the conf
+	 * 
+	 * @return the input path array
+	 */
+	public static Path[] getInputPaths(Configuration conf) {
+		String dirs = conf.get("mapred.input.dir", "");
+	    String [] list = StringUtils.split(dirs);
+	    Path[] result = new Path[list.length];
+	    for (int i = 0; i < list.length; i++) {
+	      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+	    }
+	    return result;
+	}
+	
+	/**
+	 * Gets the current split size.
+	 * 
+	 * @param conf the configuration object of this job
+	 * 
+	 * @return the current split size
+	 */
+	public static long getCurrentSplitSize(Configuration conf) {
+		long blockSize = conf.getLong("dfs.block.size",64*1024*1024L);
+		long minSplitSize = conf.getLong("mapred.min.split.size", 1L);
+		long maxSplitSize = conf.getLong("mapred.max.split.size",Long.MAX_VALUE);
+		return Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
+	}
+	
+	/**
+	 * Gets the average (Map/Reduce) task duration.
+	 * 
+	 * @param taskReports the task(Map/Reduce) reports
+	 * @param conf the configuration object
+	 * 
+	 * @return the average task duration
+	 */
+	public static long getAverageDuration(TaskReport[] taskReports, Configuration conf) {
+		long durationSum = 0;
+		for (int i = 0; i < taskReports.length; i++) {
+			TaskReport report = taskReports[i];
+			if(report.getCurrentStatus() == TIPStatus.COMPLETE) {
+				durationSum += report.getFinishTime() - report.getStartTime();			//convert in seconds
+			}
+		}
+		return durationSum/taskReports.length;
+	}
+	
+	/**
+	 * Gets the map phase duration.
+	 * 
+	 * @param jobInfo the jobInfo object
+	 * 
+	 * @return the map phase duration
+	 */
+	public static long getMapPhaseDuration(JobInfo jobInfo) {
+		long mapStarted = 0;
+		long mapFinished = 0;
+		Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+		for (JobHistory.Task task : tasks.values()) {
+			Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+			for (TaskAttempt attempt : attempts.values()) {
+				long startTime = attempt.getLong(Keys.START_TIME);
+				long finishTime = attempt.getLong(Keys.FINISH_TIME);
+				
+				if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
+					if (mapStarted == 0 || mapStarted > startTime) {
+						mapStarted = startTime;
+					}
+					if (mapFinished < finishTime) {
+						mapFinished = finishTime;
+					}
+				}
+			}
+
+		}
+		return (mapFinished - mapStarted);
+	}
+	
+	/**
+	 * Gets the reduce duration.
+	 * 
+	 * @param jobInfo the jobInfo object
+	 * 
+	 * @return the reduce duration
+	 */
+	public static long getReduceDuration(JobInfo jobInfo) {
+		long reduceStarted = 0;
+		long reduceFinished = 0;
+		Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+		for (JobHistory.Task task : tasks.values()) {
+			Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+			for (TaskAttempt attempt : attempts.values()) {
+				long startTime = attempt.getLong(Keys.START_TIME);
+				long finishTime = attempt.getLong(Keys.FINISH_TIME);
+				if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) {
+					if (reduceStarted == 0 || reduceStarted > startTime) {
+						reduceStarted = startTime;
+					}
+					if (reduceFinished < finishTime) {
+						reduceFinished = finishTime;
+					}
+				}
+			}
+
+		}
+		return (reduceFinished - reduceStarted);
+	}
+	
+	
+	/**
+	 * Gets the no. of map iterations.
+	 * 
+	 * @param jobInfo the jobInfo object
+	 * @param taskReports the task reports
+	 * @param conf the configuration object
+	 * @param mapDuration the map phase duration
+	 * @param avgMapTime the average map time
+	 * 
+	 * @return the no. of map iterations
+	 */
+	public static float getMapIterations(JobInfo jobInfo, TaskReport[] taskReports, Configuration conf, long mapDuration, long avgMapTime){
+		return (float)mapDuration/avgMapTime;
+	}
+	
+	/**
+	 * Gets the reduce load.
+	 * 
+	 * @param reduceTasks the reduce tasks
+	 * @param totalReduceInputRecords the total reduce input records
+	 * 
+	 * @return the no. of reducers having loadFactor(80% by default) percent of load
+	 */
+	public static long getReduceLoad(JobHistory.Task [] reduceTasks, long totalReduceInputRecords) {
+		int loadFactor = Integer.parseInt(properties.getProperty("reduce.load.percent", "80"));
+		ArrayList<Long> reduceInputRecords = new ArrayList<Long>();
+		
+		for (int i = 0; i < reduceTasks.length; i++) {
+			JobHistory.Task attempt = reduceTasks[i];
+			Counters counters = null;
+			try {
+				counters = Counters.fromEscapedCompactString(attempt.get(Keys.COUNTERS));
+			} catch (ParseException e) {
+				e.printStackTrace();
+			}
+			long inputRecords = counters.findCounter("org.apache.hadoop.mapred.Task$Counter","REDUCE_INPUT_RECORDS").getValue();
+			reduceInputRecords.add(inputRecords);
+			
+		}
+		Collections.sort(reduceInputRecords, Collections.reverseOrder());
+		long records = 0, num = 0;
+		float reduceRecords = ((float)loadFactor/100) * totalReduceInputRecords;
+		for (Iterator<Long> iterator = reduceInputRecords.iterator(); iterator
+				.hasNext();) {
+			Long inputRecords = iterator.next();
+			records += inputRecords.longValue();
+			num++;
+			if(records >= reduceRecords){
+				break;
+			}
+		}
+		return num;
+	}
+	
+	/**
+	 * Gets the number of failed and killed map tasks.
+	 * 
+	 * @param job the JobInProgress Object
+	 * 
+	 * @return the number failed and killed map tasks
+	 */
+	public static long getNumFailedAndKilledMapTasks(JobInProgress job){
+		TaskInProgress[] tasks = job.getMapTasks();
+		long numFailedAndKilledTaskAttempts = 0;
+        for (int i = 0; i < tasks.length; i++) {
+			TaskInProgress task = tasks[i];
+			numFailedAndKilledTaskAttempts += task.numTaskFailures() + task.numKilledTasks();
+		}		               
+		return numFailedAndKilledTaskAttempts;
+	}
+	
+	/**
+	 * Gets the percentage of maps failed.
+	 * 
+	 * @param numFailedAndKilledMapTasks the number of failed and killed map tasks
+	 * @param numMapTasks the number of map tasks
+	 * 
+	 * @return the percentage of maps failed/killed
+	 */
+	public static float getPercentMapFailed(int numFailedAndKilledMapTasks, long numMapTasks){
+		return ((float)numFailedAndKilledMapTasks/numMapTasks) * 100;
+	}
+	
+	/**
+	 * Gets the number of failed and killed reduce tasks.
+	 * 
+	 * @param job the jobInProgress Object
+	 * 
+	 * @return the number of failed and killed reduce tasks
+	 */
+	public static long getNumFailedAndKilledReduceTasks(JobInProgress job){
+		TaskInProgress[] tasks = job.getReduceTasks();
+		long numFailedAndKilledTaskAttempts = 0;
+        for (int i = 0; i < tasks.length; i++) {
+			TaskInProgress task = tasks[i];
+			numFailedAndKilledTaskAttempts += task.numTaskFailures() + task.numKilledTasks();
+		}		               
+		return numFailedAndKilledTaskAttempts;
+	}
+	
+	/**
+	 * Gets the percentage of reduce tasks failed.
+	 * 
+	 * @param numFailedAndKilledReduceTasks the number of failed and killed reduce tasks
+	 * @param numReduceTasks the number of reduce tasks
+	 * 
+	 * @return the percentage of reduce failed
+	 */
+	public static float getPercentReduceFailed(int numFailedAndKilledReduceTasks, long numReduceTasks){
+		return ((float) numFailedAndKilledReduceTasks/numReduceTasks) * 100;
+	}
+	
+	/**
+	 * Gets the  map tasks per tasktracker suggestion message.
+	 * 
+	 * @param averageIdleCPUPercentageWhileMap the average idle cpu percentage while map phase
+	 * @param maximumSwapMemoryUsedInKBWhileMap the maximum swap memory used in KBs while map phase
+	 * @param mapTaskCapacity the map task capacity of the cluster
+	 * @param numMapTask the total number of map tasks for this job
+	 * 
+	 * @return the map tasks per tasktracker suggestion message
+	 */
+	public static SuggestionMessage getNumMapTaskSuggestionMessage(float averageIdleCPUPercentageWhileMap, long maximumSwapMemoryUsedInKBWhileMap, long mapTaskCapacity, long numMapTask){
+		long swapMemoryThres=Long.parseLong(properties.getProperty("swap.memory.threshold", "10240"));
+		float idleCpuThresMax=Float.parseFloat(properties.getProperty("idle.cpu.max.percent.threshold", "15"));
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("NumMapTasks");
+		if(maximumSwapMemoryUsedInKBWhileMap > swapMemoryThres && numMapTask > mapTaskCapacity){
+			//return "Decrease Map Tasks or Memory per Task- Swap Memory Usage is " + maximumSwapMemoryUsedInKBWhileMap;
+			suggestionMessage.setMessage("Decrease Map Tasks/Node or Memory per Task- Swap Memory Usage is " + maximumSwapMemoryUsedInKBWhileMap);
+			suggestionMessage.setColorCode("red");
+		} else if(averageIdleCPUPercentageWhileMap > idleCpuThresMax  && numMapTask > mapTaskCapacity){
+			//return "Increase Map Tasks - CPU is idle for " + averageIdleCPUPercentageWhileMap + "% time";
+			suggestionMessage.setMessage("Increase Map Tasks/Node - CPU is idle for " + averageIdleCPUPercentageWhileMap + "% time");
+			suggestionMessage.setColorCode("red");
+		} else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the reduce tasks per tasktracker suggestion message.
+	 * 
+	 * @param averageIdleCPUPercentageWhileReduce the average idle cpu percentage while reduce phase
+	 * @param maximumSwapMemoryUsedInKBWhileReduce the maximum swap memory used in KBs while reduce phase
+	 * @param reduceTaskCapacity the reduce task capacity of the cluster
+	 * @param numReduceTask the total number of reduce task for this job
+	 * 
+	 * @return the reduce tasks per tasktracker suggestion message
+	 */
+	public static SuggestionMessage getNumReduceTaskSuggestionMessage(float averageIdleCPUPercentageWhileReduce, long maximumSwapMemoryUsedInKBWhileReduce, long reduceTaskCapacity, long numReduceTask){
+		long swapMemoryThres=Long.parseLong(properties.getProperty("swap.memory.threshold", "10240"));
+		float idleCpuThresMax=Float.parseFloat(properties.getProperty("idle.cpu.max.percent.threshold", "15"));
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("NumReduceTasks");
+		if(maximumSwapMemoryUsedInKBWhileReduce > swapMemoryThres && numReduceTask > reduceTaskCapacity){
+			//return "Decrease Reduce Tasks or Memory per Task- Swap Memory Usage is " + maximumSwapMemoryUsedInKBWhileReduce;
+			suggestionMessage.setMessage("Decrease Reduce Tasks/Node or Memory per Task- Swap Memory Usage is " + maximumSwapMemoryUsedInKBWhileReduce);
+			suggestionMessage.setColorCode("red");
+		} else if(averageIdleCPUPercentageWhileReduce > idleCpuThresMax  && numReduceTask > reduceTaskCapacity){
+			//return "Increase Reduce Tasks - CPU is idle for " + averageIdleCPUPercentageWhileReduce + "% time";
+			suggestionMessage.setMessage("Increase Reduce Tasks/Node - CPU is idle for " + averageIdleCPUPercentageWhileReduce + "% time");
+			suggestionMessage.setColorCode("red");
+		} else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the split size suggestion message.
+	 * 
+	 * @param percentFileSmallerThanSplitSize the percentage of files smaller than split size
+	 * @param averageMapDuration the average map task duration
+	 * @param mapIterations the number of map tasks iterations
+	 * 
+	 * @return the split size suggestion message
+	 */
+	public static SuggestionMessage getSplitSizeSuggestionMessage(float percentFileSmallerThanSplitSize, long averageMapDuration, float mapIterations){
+		float fileSizeThres = Float.parseFloat(properties.getProperty("file.size.percent.threshold", "50.0"));
+		long averageMapDurationThres = Long.parseLong(properties.getProperty("average.map.task.duration.seconds", "30")); 			// in seconds
+		float mapIterationsThres = Float.parseFloat(properties.getProperty("map.iterations", "15.0"));
+		long averageMapDurationThresMillis = averageMapDurationThres * 1000;
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("SplitSize");
+		if(averageMapDuration < averageMapDurationThresMillis && percentFileSmallerThanSplitSize != -1 && percentFileSmallerThanSplitSize < fileSizeThres) {
+			suggestionMessage.setMessage("Increase Split Size");
+			suggestionMessage.setColorCode("red");
+		} else if(averageMapDuration > averageMapDurationThresMillis && percentFileSmallerThanSplitSize != -1 && percentFileSmallerThanSplitSize > fileSizeThres){
+			suggestionMessage.setMessage("Decrease Split Size");
+			suggestionMessage.setColorCode("red");
+		} else if(mapIterations > mapIterationsThres && averageMapDuration > averageMapDurationThresMillis && percentFileSmallerThanSplitSize != -1 && percentFileSmallerThanSplitSize < fileSizeThres){
+			suggestionMessage.setMessage("Increase Split Size");
+			suggestionMessage.setColorCode("red");
+		}else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the sort factor suggestion message.
+	 * 
+	 * @param averageNumberOfSpillsPerMap the average number of spills per map
+	 * @param sortFactor the sort factor
+	 * @param sortBuffer the sort buffer in MBs
+	 * 
+	 * @return the sort factor suggestion message
+	 */
+	public static SuggestionMessage getSortFactorSuggestionMessage(long averageNumberOfSpillsPerMap, long sortFactor, long sortBuffer){
+		float numSpillsFactor = Float.parseFloat(properties.getProperty("num.spill.factor", "3.0"));
+		float factor = (float)averageNumberOfSpillsPerMap/sortFactor;
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("SortFactor");
+		if(factor > numSpillsFactor){
+			//return "Increase Sort Factor - Spills/Map are " + factor + " times the current value";
+			suggestionMessage.setMessage("Increase Sort Factor - Spills/Map are " + factor + " times the current value");
+			suggestionMessage.setColorCode("red");
+		} else if(sortBuffer < sortFactor*10 ){
+			//return "Ideally io.sort.mb=10*io.sort.factor";
+			suggestionMessage.setMessage("Ideally io.sort.mb=10*io.sort.factor");
+			suggestionMessage.setColorCode("yellow");
+		}else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the sort buffer suggestion message.
+	 * 
+	 * @param spilledRecords the spilled records
+	 * @param mapOutputRecords the map output records
+	 * 
+	 * @return the sort buffer suggestion message
+	 */
+	public static SuggestionMessage getSortBufferSuggestionMessage(long spilledRecords, long mapOutputRecords){
+		float spillFactor = Float.parseFloat(properties.getProperty("num.spill.factor", "3.0"));;
+		float factor = (float)spilledRecords/mapOutputRecords;
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("SortBuffer");
+		if(factor > spillFactor){
+			//return "Increase Sort Buffer - SpilledRecords are " + factor + " times the MapOutputRecords";
+			suggestionMessage.setMessage("Increase Sort Buffer - SpilledRecords are " + factor + " times the MapOutputRecords");
+			suggestionMessage.setColorCode("red");
+		}else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the map output compression suggestion message.
+	 * 
+	 * @param mapOutputCompression the map output compression flag
+	 * @param compressionCodec the compression codec
+	 * @param inputFormatClass the input format class
+	 * 
+	 * @return the map output compression suggestion message
+	 */
+	public static SuggestionMessage getMapOutputCompressionSuggestionMessage(boolean mapOutputCompression, String compressionCodec, String inputFormatClass){
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("MapOutputCompression");
+		if(!mapOutputCompression){
+			//return "Ideally this value should be true";
+			suggestionMessage.setMessage("This value should be true if the map output data is compressible");
+			suggestionMessage.setColorCode("yellow");
+		} else if(!compressionCodec.equals("com.hadoop.compression.lzo.LzoCodec")){
+			//return "Use LZO Compression for best performance";
+			suggestionMessage.setMessage("Use LZO Compression for better performance");
+			suggestionMessage.setColorCode("green");
+		}else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the JVM options suggestion message.
+	 * 
+	 * @param averageCPUCoresPerTaskTracker the average CPU cores per task tracker
+	 * @param javaOpts the java opts property value
+	 * 
+	 * @return the JVM options suggestion message
+	 */
+	public static SuggestionMessage getJVMOptionsSuggestionMessage(long averageCPUCoresPerTaskTracker, String javaOpts){
+		SuggestionMessage suggestionMessage = new PerformanceAnalyzer.SuggestionMessage();
+		suggestionMessage.setProperty("JvmOptions");
+		if(averageCPUCoresPerTaskTracker > 2 && !javaOpts.contains("-XX:+UseConcMarkSweepGC")){
+			//return "Use -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseFastAccessorMethods -XX:-UseGCOverheadLimit -XX:+AggressiveOpts -server";
+			suggestionMessage.setMessage("Use -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseFastAccessorMethods -XX:-UseGCOverheadLimit -XX:+AggressiveOpts -server");
+			suggestionMessage.setColorCode("yellow");
+		}else {
+			suggestionMessage.setMessage("OK");
+			suggestionMessage.setColorCode("green");
+		}
+		return suggestionMessage;
+	}
+	
+	/**
+	 * Gets the percentage time spent in GC
+	 * 
+	 * @param totalTime the total time 
+	 * @param gcTime the GC time
+	 * 
+	 * @return the percentage time spent in GC
+	 */
+	public static float getPercentGCTimeSpend(long totalTime, long gcTime){
+		float percentGCTime = 0;
+		if(totalTime > 0){
+			percentGCTime = ((float)gcTime/totalTime)*100;
+		}
+		return percentGCTime;
+	}
+	public static Properties getProperties() {
+		return properties;
+	}
+	
+}
+
Index: src/mapred/org/apache/hadoop/mapred/Task.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/Task.java	(working copy)
@@ -21,11 +21,14 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +67,9 @@
     MAP_SKIPPED_RECORDS,
     MAP_INPUT_BYTES, 
     MAP_OUTPUT_BYTES,
+    NUMBER_OF_SPILLS,
+    MAX_MEMORY_USAGE,
+    GC_TIME_MILLIS,
     COMBINE_INPUT_RECORDS,
     COMBINE_OUTPUT_RECORDS,
     REDUCE_INPUT_GROUPS,
@@ -129,6 +135,7 @@
     skipRanges.skipRangeIterator();
   
   protected JobConf conf;
+  protected boolean enablePerformanceDiagnosis;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
   private final static int MAX_RETRIES = 10;
@@ -139,7 +146,7 @@
   protected final Counters.Counter spilledRecordsCounter;
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
-
+ 
   ////////////////////////////////////////////
   // Constructors
   ////////////////////////////////////////////
@@ -524,6 +531,11 @@
       while (!taskDone.get()) {
         try {
           boolean taskFound = true; // whether TT knows about this task
+          if(enablePerformanceDiagnosis){
+        	  updateMemoryCounter();
+        	  updateGCCounter();
+          }
+          
           // sleep for a bit
           try {
             Thread.sleep(PROGRESS_INTERVAL);
@@ -569,6 +581,32 @@
         }
       }
     }
+    
+	/**
+	 * Update memory usage counter counter.
+	 */
+	private void updateMemoryCounter() {
+		long prevMaxMemoryUsage = this.getCounter(Counter.MAX_MEMORY_USAGE).getValue();
+          long currentMemoryUsage = Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory();
+          if(currentMemoryUsage > prevMaxMemoryUsage){
+        	  this.getCounter(Counter.MAX_MEMORY_USAGE).increment(currentMemoryUsage - prevMaxMemoryUsage);
+          }
+	}
+	
+	/**
+	 * Update GC time spent counter.
+	 */
+	private void updateGCCounter() {
+		List<GarbageCollectorMXBean> gcBeans =
+              ManagementFactory.getGarbageCollectorMXBeans();
+          long timeMillis = 0;
+          for (GarbageCollectorMXBean gcBean : gcBeans) {
+              timeMillis += gcBean.getCollectionTime();
+          }
+          long lastCollectionTime = this.getCounter(Counter.GC_TIME_MILLIS).getValue();
+          this.getCounter(Counter.GC_TIME_MILLIS).increment(timeMillis - lastCollectionTime);
+          
+	}
     public void startCommunicationThread() {
       if (pingThread == null) {
         pingThread = new Thread(this, "communication thread");
@@ -839,6 +877,7 @@
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
+      enablePerformanceDiagnosis = conf.getBoolean("mapred.performance.diagnose", false);
     } else {
       this.conf = new JobConf(conf);
     }
Index: src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
===================================================================
--- src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/Task_Counter.properties	(working copy)
@@ -16,4 +16,6 @@
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
 SPILLED_RECORDS.name=          Spilled Records
-
+NUMBER_OF_SPILLS.name=         Number Of Spills
+MAX_MEMORY_USAGE.name=		   Maximum Memory Usage
+GC_TIME_MILLIS.name=		   GC Time Millis
Index: src/mapred/org/apache/hadoop/mapred/TaskTracker.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/TaskTracker.java	(working copy)
@@ -17,9 +17,12 @@
  */
  package org.apache.hadoop.mapred;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
@@ -202,6 +205,13 @@
   private int maxCurrentMapTasks;
   private int maxCurrentReduceTasks;
   private int failures;
+  private int noOfCPUCores;
+  private long totalMemoryInKB;
+  private long swapMemoryUsedInKB;
+  private long freeMemoryInKB;
+  private long cacheMemoryInKB;
+  private float cpuIdlePercentage;
+  private boolean enablePerformanceDiagnosis;
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -898,10 +908,30 @@
    */
   public TaskTracker(JobConf conf) throws IOException {
     originalConf = conf;
+    enablePerformanceDiagnosis = conf.getBoolean("mapred.performance.diagnose", false);
     maxCurrentMapTasks = conf.getInt(
                   "mapred.tasktracker.map.tasks.maximum", 2);
     maxCurrentReduceTasks = conf.getInt(
                   "mapred.tasktracker.reduce.tasks.maximum", 2);
+    //calculate number of CPU cores and total memory on this tasktracker machine
+    if(enablePerformanceDiagnosis) {
+    	
+    	String[] shellCmd = {"grep", "processor", "/proc/cpuinfo"};
+        ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+        shexec.execute();
+        int exitcode = shexec.getExitCode();
+        String output = shexec.getOutput();
+        if (exitcode != 0) {
+          throw new IOException(" Error executing free command process exited with exit code " + exitcode);
+        }
+        calculateCPUAndMemoryInfo();
+        try {
+    		noOfCPUCores = output.split("\n").length;
+    	} catch (NumberFormatException e) {
+    		e.printStackTrace();
+    	}
+    }
+    
     this.jobTrackAddr = JobTracker.getAddress(conf);
     String infoAddr = 
       NetUtils.getServerAddress(conf,
@@ -934,7 +964,41 @@
     initialize();
   }
 
-  private void checkJettyPort(int port) throws IOException { 
+  private String executeTopCommand() throws IOException {
+		String[] shellCmd = { "top", "-n", "2", "-d", "0", "b" };
+		Process process = null;
+
+		process = new ProcessBuilder(shellCmd).start();
+
+		InputStream is = process.getInputStream();
+		InputStreamReader isr = new InputStreamReader(is);
+		BufferedReader br = new BufferedReader(isr);
+		String line;
+		int exitCode = 0;
+		StringBuffer output = new StringBuffer();
+		try {
+			exitCode = process.waitFor();
+
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+
+		while ((line = br.readLine()) != null) {
+			output.append(line);
+			output.append("\n");
+		}
+
+		if (exitCode > 1) {
+
+			throw new IOException(
+					" Error executing top command process exited with exit code "
+							+ exitCode);
+
+		}
+		return output.toString();
+	}
+
+private void checkJettyPort(int port) throws IOException { 
     //See HADOOP-4744
     if (port < 0) {
       shuttingDown = true;
@@ -1154,6 +1218,10 @@
   private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
     // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
     boolean sendCounters;
+    if(enablePerformanceDiagnosis){
+    	calculateCPUAndMemoryInfo();
+    }
+    
     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
       sendCounters = true;
       previousUpdate = now;
@@ -1168,15 +1236,34 @@
     // else resend the previous status information.
     //
     if (status == null) {
-      synchronized (this) {
-        status = new TaskTrackerStatus(taskTrackerName, localHostname, 
-                                       httpPort, 
-                                       cloneAndResetRunningTaskStatuses(
-                                         sendCounters), 
-                                       failures, 
-                                       maxCurrentMapTasks,
-                                       maxCurrentReduceTasks); 
-      }
+    	if(enablePerformanceDiagnosis) {
+    		synchronized (this) {
+    	        status = new TaskTrackerStatus(taskTrackerName, localHostname, 
+    	                                       httpPort, 
+    	                                       cloneAndResetRunningTaskStatuses(
+    	                                         sendCounters), 
+    	                                       failures, 
+    	                                       maxCurrentMapTasks,
+    	                                       maxCurrentReduceTasks,
+    	                                       noOfCPUCores,
+    	                                       totalMemoryInKB,
+    	                                       cpuIdlePercentage,
+    	                                       freeMemoryInKB,
+    	                                       swapMemoryUsedInKB,
+    	                                       cacheMemoryInKB); 
+    	      }
+    	} else {
+    		synchronized (this) {
+    	        status = new TaskTrackerStatus(taskTrackerName, localHostname, 
+    	                                       httpPort, 
+    	                                       cloneAndResetRunningTaskStatuses(
+    	                                         sendCounters), 
+    	                                       failures, 
+    	                                       maxCurrentMapTasks,
+    	                                       maxCurrentReduceTasks); 
+    	      }
+    	}
+      
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
                "' with reponseId '" + heartbeatResponseId);
@@ -1257,6 +1344,76 @@
   }
 
   /**
+   * Calculate cpu and memory info. Executes and parses 'top' linux command output
+   * 
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  private void calculateCPUAndMemoryInfo() throws IOException {
+	  
+	    String output = executeTopCommand();
+	    int i;
+	    String[] topOutputArr = output.split("\n");
+	    for (i = 1; i < topOutputArr.length; i++) {
+			if(topOutputArr[i].startsWith("top")){
+				break;
+			}
+			
+		}
+	    int offset = i;
+		// parsing idle cpu time
+		
+		String[] cpuInfoArr = topOutputArr[offset + 2].split(",");		//line 3 is cpu info
+		String idleCPUPercentageStr = cpuInfoArr[3].trim();		// column 4 is idle cpu info
+		try {
+			cpuIdlePercentage = Float.parseFloat(idleCPUPercentageStr.split("%")[0]);
+			
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+		
+		
+		// parsing memory info
+		
+		String[] memInfoArr = topOutputArr[offset + 3].split(",");		//line 4 is memory info
+		String totalMemInfo = memInfoArr[0];					//column 1 is total mem info
+		String totalMemStr = totalMemInfo.substring(totalMemInfo.indexOf(':') + 1, totalMemInfo.indexOf('k'));
+		try {
+			totalMemoryInKB = Long.parseLong(totalMemStr.trim());
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+		
+		String freeMemInfo = memInfoArr[2];						//column 3 is free mem info
+		String freeMemStr = freeMemInfo.substring(0,freeMemInfo.indexOf('k'));
+		try {
+			freeMemoryInKB = Long.parseLong(freeMemStr.trim());
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+		
+		
+		// parsing swap info
+		
+		String[] swapMemInfoArr = topOutputArr[offset + 4].split(",");		//line 5 is swap memory info
+		String usedSwapMemInfo = swapMemInfoArr[1];					//column 2 is used swap mem info
+		String usedSwapMemStr = usedSwapMemInfo.substring(0,usedSwapMemInfo.indexOf('k'));
+		try {
+			swapMemoryUsedInKB = Long.parseLong(usedSwapMemStr.trim());
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+		
+		String cacheMemInfo = swapMemInfoArr[3];					//column 4 is cache info
+		String cacheMemStr = cacheMemInfo.substring(0,cacheMemInfo.indexOf('k'));
+		try {
+			cacheMemoryInKB = Long.parseLong(cacheMemStr.trim());
+		} catch (NumberFormatException e) {
+			e.printStackTrace();
+		}
+	
+}
+
+/**
    * Return the total virtual memory available on this TaskTracker.
    * @return total size of virtual memory.
    */
Index: src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java	(revision 921731)
+++ src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java	(working copy)
@@ -48,6 +48,13 @@
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
+  private int noOfCPUCores;
+  private long totalRAM;
+  private float cpuIdlePercentage;
+  private long swapMemoryUsedInKB;
+  private long freeMemoryInKB;
+  private long cacheMemoryInKB;
+  
    
   /**
    * Class representing a collection of resources on this tasktracker.
@@ -198,7 +205,10 @@
   public TaskTrackerStatus(String trackerName, String host, 
                            int httpPort, List<TaskStatus> taskReports, 
                            int failures, int maxMapTasks,
-                           int maxReduceTasks) {
+                           int maxReduceTasks, int noOfCPUCores,
+                           long totalMemoryInKB, float cpuIdlePercentage, 
+                           long freeMemoryInKB, long swapMemoryUsedInKB,
+                           long cacheMemory) {
     this.trackerName = trackerName;
     this.host = host;
     this.httpPort = httpPort;
@@ -208,8 +218,29 @@
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
     this.resStatus = new ResourceStatus();
+    this.noOfCPUCores = noOfCPUCores;
+    this.totalRAM = totalMemoryInKB;
+    this.cpuIdlePercentage = cpuIdlePercentage;
+    this.freeMemoryInKB = freeMemoryInKB;
+    this.swapMemoryUsedInKB = swapMemoryUsedInKB;
+    this.cacheMemoryInKB = cacheMemory;
   }
+  
+  public TaskTrackerStatus(String trackerName, String host, 
+          int httpPort, List<TaskStatus> taskReports, 
+          int failures, int maxMapTasks,
+          int maxReduceTasks) {
+this.trackerName = trackerName;
+this.host = host;
+this.httpPort = httpPort;
 
+this.taskReports = new ArrayList<TaskStatus>(taskReports);
+this.failures = failures;
+this.maxMapTasks = maxMapTasks;
+this.maxReduceTasks = maxReduceTasks;
+this.resStatus = new ResourceStatus();
+}
+
   /**
    */
   public String getTrackerName() {
@@ -334,6 +365,14 @@
     for (TaskStatus taskStatus : taskReports) {
       TaskStatus.writeTaskStatus(out, taskStatus);
     }
+    
+	out.writeInt(noOfCPUCores);
+    out.writeLong(totalRAM);
+    out.writeFloat(cpuIdlePercentage);
+    out.writeLong(freeMemoryInKB);
+    out.writeLong(swapMemoryUsedInKB);
+    out.writeLong(cacheMemoryInKB);
+
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -350,5 +389,37 @@
     for (int i = 0; i < numTasks; i++) {
       taskReports.add(TaskStatus.readTaskStatus(in));
     }
+    
+	this.noOfCPUCores = in.readInt();
+    this.totalRAM = in.readLong();
+    this.cpuIdlePercentage = in.readFloat();
+    this.freeMemoryInKB = in.readLong();
+    this.swapMemoryUsedInKB = in.readLong();
+    this.cacheMemoryInKB = in.readLong();
+
   }
+
+public int getNoOfCPUCores() {
+	return noOfCPUCores;
 }
+
+public long getTotalRAM() {
+	return totalRAM;
+}
+
+public float getCpuIdlePercentage() {
+	return cpuIdlePercentage;
+}
+
+public long getSwapMemoryUsedInKB() {
+	return swapMemoryUsedInKB;
+}
+
+public long getFreeMemoryInKB() {
+	return freeMemoryInKB;
+}
+
+public long getCacheMemoryInKB() {
+	return cacheMemoryInKB;
+}
+}
Index: src/webapps/job/jobdetails.jsp
===================================================================
--- src/webapps/job/jobdetails.jsp	(revision 921731)
+++ src/webapps/job/jobdetails.jsp	(working copy)
@@ -383,6 +383,9 @@
 
 <hr>
 <a href="jobtracker.jsp">Go back to JobTracker</a><br>
+<% if(tracker.isEnablePerformanceDiagnosis()) {%>
+<a href="tasktrackerinfo.jsp?jobid=<%=jobId%>">TaskTracker Info</a><br>
 <%
+   }
 out.println(ServletUtil.htmlFooter());
 %>
Index: src/webapps/job/jobdetailshistory.jsp
===================================================================
--- src/webapps/job/jobdetailshistory.jsp	(revision 921731)
+++ src/webapps/job/jobdetailshistory.jsp	(working copy)
@@ -19,8 +19,10 @@
 <%
     String jobid = request.getParameter("jobid");
     String logFile = request.getParameter("logFile");
+    JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+    JobID jobIdObj = JobID.forName(jobid);
 	String encodedLogFileName = JobHistory.JobInfo.encodeJobHistoryFilePath(logFile);
-	
+	JobInProgress jobInProgress = (JobInProgress) tracker.getJob(jobIdObj);
     Path jobFile = new Path(logFile);
     String[] jobDetails = jobFile.getName().split("_");
     String jobUniqueString = jobDetails[0] + "_" +jobDetails[1] + "_" + jobid ;
@@ -132,7 +134,10 @@
       }
     }
 %>
-<b><a href="analysejobhistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>">Analyse This Job</a></b> 
+<b><a href="analysejobhistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>">Analyse This Job</a></b><br>
+<%//if(jobInProgress != null && jobInProgress.isEnablePerformanceDiagnosis()) { %>
+<b><a href="performanceanalysis.jsp?jobid=<%=jobid %>">Performance Analysis</a></b>
+<%//} %> 
 <hr/>
 <center>
 <table border="2" cellpadding="5" cellspacing="2">
Index: src/webapps/job/jobhistory.jsp
===================================================================
--- src/webapps/job/jobhistory.jsp	(revision 921731)
+++ src/webapps/job/jobhistory.jsp	(working copy)
@@ -26,6 +26,7 @@
 <hr>
 <h2>Available History </h2>
 <%
+JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
     PathFilter jobLogFileFilter = new PathFilter() {
       public boolean accept(Path path) {
         return !(path.getName().endsWith(".xml"));
Index: src/webapps/job/performanceanalysis.jsp
===================================================================
--- src/webapps/job/performanceanalysis.jsp	(revision 0)
+++ src/webapps/job/performanceanalysis.jsp	(revision 0)
@@ -0,0 +1,284 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="java.util.List"
+  import="java.util.Map"
+  import="java.util.ArrayList"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.PerformanceAnalyzer.*"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<%
+String jobid = request.getParameter("jobid");
+JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+JobID jobIdObj = JobID.forName(jobid);
+JobInProgress job = (JobInProgress) tracker.getJob(jobIdObj);
+String jobFilePath = JobTracker.getLocalJobFilePath(jobIdObj);
+JobConf conf = new JobConf(jobFilePath);
+JobInfo jobInfo = (JobInfo)request.getSession().getAttribute("job");
+float averageIdleCPUPercentageWhileMap =  (float)Double.parseDouble(jobInfo.get(Keys.IDLE_CPU_PER_SUM_MAP))/jobInfo.getLong(Keys.TOTAL_HEARTBEAT_MAP); //job.getIdleCPUPercentageSumWhileMap()/job.getTotalHeartbeatsWhileMap();
+float averageIdleCPUPercentageWhileReduce = (float)Double.parseDouble(jobInfo.get(Keys.IDLE_CPU_PER_SUM_REDUCE))/jobInfo.getLong(Keys.TOTAL_HEARTBEAT_REDUCE); //job.getIdleCPUPercentageSumWhileReduce()/job.getTotalHeartbeatsWhileReduce();
+long totalCPUCoresInCluster = jobInfo.getLong(Keys.TOTAL_CPU_CORES); //tracker.getTotalCPUCoresInCluster();
+//long totalTasks = tracker.getClusterStatus().getMaxMapTasks() + tracker.getClusterStatus().getMaxReduceTasks();
+Counters mapCounters = Counters.fromEscapedCompactString(jobInfo.get(Keys.MAP_COUNTERS));
+Counters reduceCounters = Counters.fromEscapedCompactString(jobInfo.get(Keys.REDUCE_COUNTERS));
+Counters.Counter spillCounter = mapCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","NUMBER_OF_SPILLS");
+Counters.Counter gcMapCounter = mapCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","GC_TIME_MILLIS");
+Counters.Counter gcReduceCounter = reduceCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","GC_TIME_MILLIS");
+Counters.Counter spilledRecordsCounter = mapCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","SPILLED_RECORDS");
+Counters.Counter mapOutputRecordsCounter = mapCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","MAP_OUTPUT_RECORDS");
+Counters.Counter totalReduceInputRecords = reduceCounters.findCounter("org.apache.hadoop.mapred.Task$Counter","REDUCE_INPUT_RECORDS");
+PerformanceAnalyzer obj = new PerformanceAnalyzer();
+obj.setProperties();
+
+Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+int finishedMaps = jobInfo.getInt(Keys.FINISHED_MAPS)  ;
+int finishedReduces = jobInfo.getInt(Keys.FINISHED_REDUCES) ;
+JobHistory.Task [] mapTasks = new JobHistory.Task[finishedMaps]; 
+JobHistory.Task [] reduceTasks = new JobHistory.Task[finishedReduces]; 
+int mapIndex = 0 , reduceIndex=0; 
+long avgMapTime = 0;
+long avgReduceTime = 0;
+long avgShuffleTime = 0;
+int numFailedMaps = 0; 
+int numKilledMaps = 0;
+int numFailedReduces = 0 ; 
+int numKilledReduces = 0;
+
+for (JobHistory.Task task : tasks.values()) {
+  Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+  for (JobHistory.TaskAttempt attempt : attempts.values()) {
+    if (attempt.get(Keys.TASK_STATUS).equals(Values.SUCCESS.name())) {
+      long avgFinishTime = (attempt.getLong(Keys.FINISH_TIME) -
+    		                attempt.getLong(Keys.START_TIME));
+      if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
+        mapTasks[mapIndex++] = attempt ; 
+        avgMapTime += avgFinishTime;
+        if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFailedMaps++; 
+          } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numKilledMaps++;
+          }
+      } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) { 
+        reduceTasks[reduceIndex++] = attempt;
+        avgShuffleTime += (attempt.getLong(Keys.SHUFFLE_FINISHED) - 
+                           attempt.getLong(Keys.START_TIME));
+        avgReduceTime += (attempt.getLong(Keys.FINISH_TIME) -
+                          attempt.getLong(Keys.SHUFFLE_FINISHED));
+      }
+    } else {
+    	if (Values.MAP.name().equals(task.get(Keys.TASK_TYPE))) {
+            if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+                numFailedMaps++; 
+              } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+                numKilledMaps++;
+              }
+          } else if (Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE))) { 
+        	  if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+                  numFailedReduces++;
+                } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+                  numKilledReduces++;
+                }
+          }
+    }
+  }
+}
+	 
+if (finishedMaps > 0) {
+  avgMapTime /= finishedMaps;
+}
+if (finishedReduces > 0) {
+  avgReduceTime /= finishedReduces;
+  avgShuffleTime /= finishedReduces;
+}
+int reduceLoadFactor = Integer.parseInt(obj.getProperties().getProperty("reduce.load.percent", "80"));
+//long totalTasks=conf.getNumMapTasks() + conf.getNumReduceTasks();
+long mapPhaseTime = PerformanceAnalyzer.getMapPhaseDuration(jobInfo);
+long reducePhaseTime = PerformanceAnalyzer.getReduceDuration(jobInfo);
+float percentFileSmallerThanSplitSize = PerformanceAnalyzer.getPercentFileSmallerThanSplitSize(conf); 
+//avgMapTime = PerformanceAnalyzer.getAverageDuration(tracker.getMapTaskReports(jobIdObj), conf);
+//avgReduceTime = PerformanceAnalyzer.getAverageDuration(tracker.getReduceTaskReports(jobIdObj), conf);
+//float mapIterations = PerformanceAnalyzer.getMapIterations(jobInfo,tracker.getMapTaskReports(jobIdObj), conf, mapPhaseTime);
+float mapIterations = (float)mapPhaseTime/avgMapTime;
+long averageNumberOfSpillsPerMap = spillCounter.getValue()/conf.getNumMapTasks();
+String mapOutputCompressionCodec = conf.get("mapred.map.output.compression.codec");
+long averageCPUCoresPerTaskTracker = totalCPUCoresInCluster/tracker.getClusterStatus().getTaskTrackers();
+long reduceLoad = PerformanceAnalyzer.getReduceLoad(reduceTasks,totalReduceInputRecords.getValue());
+
+SuggestionMessage numMapTaskSuggestion = PerformanceAnalyzer.getNumMapTaskSuggestionMessage(averageIdleCPUPercentageWhileMap,jobInfo.getLong(Keys.MAX_SWAP_MEM_MAP),tracker.getClusterStatus().getMaxMapTasks(),conf.getNumMapTasks()); //job.getMaximumSwapMemoryUsedInKBWhileMap()
+SuggestionMessage numReduceTaskSuggestion = PerformanceAnalyzer.getNumReduceTaskSuggestionMessage(averageIdleCPUPercentageWhileReduce,jobInfo.getLong(Keys.MAX_SWAP_MEM_REDUCE),tracker.getClusterStatus().getMaxReduceTasks(), conf.getNumReduceTasks()); //job.getMaximumSwapMemoryUsedInKBWhileReduce()
+SuggestionMessage splitSizeSuggestion = PerformanceAnalyzer.getSplitSizeSuggestionMessage(percentFileSmallerThanSplitSize, avgMapTime, mapIterations);
+SuggestionMessage sortBufferSuggestion = PerformanceAnalyzer.getSortBufferSuggestionMessage(spilledRecordsCounter.getValue(),mapOutputRecordsCounter.getValue());
+SuggestionMessage sortFactorSuggestion = PerformanceAnalyzer.getSortFactorSuggestionMessage(averageNumberOfSpillsPerMap,conf.getInt("io.sort.factor",10),conf.getLong("io.sort.mb",10));
+SuggestionMessage mapOutputCompressionSuggestion = PerformanceAnalyzer.getMapOutputCompressionSuggestionMessage(conf.getCompressMapOutput(),mapOutputCompressionCodec, conf.get("mapred.input.format.class"));
+SuggestionMessage jvmOptionsSuggestion = PerformanceAnalyzer.getJVMOptionsSuggestionMessage(averageCPUCoresPerTaskTracker,conf.get("mapred.child.java.opts"));
+List<SuggestionMessage> list = new ArrayList<SuggestionMessage>();
+list.add(numMapTaskSuggestion);
+list.add(numReduceTaskSuggestion);
+list.add(splitSizeSuggestion);
+list.add(sortBufferSuggestion);
+list.add(sortFactorSuggestion);
+list.add(mapOutputCompressionSuggestion);
+list.add(jvmOptionsSuggestion);
+Collections.sort(list);
+
+%>
+<%@page import="org.apache.hadoop.util.StringUtils"%>
+<%@page import="java.util.Iterator"%>
+<%@page import="java.util.Collections"%>
+<html>
+ <body>
+  <table border="1">
+   <thead>
+    <tr>
+     <th>Property</th>
+     <th>Current Value</th>
+     <th>Suggestion</th>
+     <th>Description</th>
+    </tr>
+   </thead>
+   <tbody>
+<% for (Iterator<SuggestionMessage> iterator = list.iterator(); iterator.hasNext();) {
+	SuggestionMessage message = iterator.next();
+	if(message.getProperty().equals("NumMapTasks")){
+%>
+	<tr>
+     <td>Map Tasks per Tasktracker</td>
+     <td><%=conf.get("mapred.tasktracker.map.tasks.maximum")%></td>
+     <td><%=message.getMessage()%></td>
+     <td>Average idle CPU percentage while map = <%=averageIdleCPUPercentageWhileMap %>%<br>
+         Total CPU cores in cluster = <%=totalCPUCoresInCluster %><br>
+         Total Map tasks = <%=conf.getNumMapTasks() %><br>
+         Minimum free memory while map = <%=jobInfo.getLong(Keys.MIN_FREE_MEM_MAP)%>KB<br>
+         Maximum swap memory used while map = <%=jobInfo.getLong(Keys.MAX_SWAP_MEM_MAP)%>KB<br>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%	
+	}else if(message.getProperty().equals("NumReduceTasks")) {
+%>
+	<tr>
+     <td>Reduce Tasks per Tasktracker</td>
+     <td><%=conf.get("mapred.tasktracker.reduce.tasks.maximum")%></td>
+     <td><%=message.getMessage()%></td>
+     <td>Average idle CPU percentage while reduce = <%=averageIdleCPUPercentageWhileReduce %><br>
+         Total CPU cores in cluster = <%=totalCPUCoresInCluster %><br>
+         Total Reduce tasks = <%=conf.getNumReduceTasks() %><br>
+         Minimum free memory while Reduce = <%=jobInfo.getLong(Keys.MIN_FREE_MEM_REDUCE) %>KB<br>
+         Maximum swap memory used while Reduce = <%=jobInfo.getLong(Keys.MAX_SWAP_MEM_REDUCE) %>KB<br>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%
+	}else if(message.getProperty().equals("SplitSize")){
+%>
+	<tr>
+     <td>Block Size/Split Sizes</td>
+     <td><%= PerformanceAnalyzer.getCurrentSplitSize(conf)%></td>
+     <td><%= message.getMessage() %></td>
+     <td><% if(percentFileSmallerThanSplitSize != -1){ %>
+     	 AverageFileSize = <%=PerformanceAnalyzer.getAverageFileSize(conf)%>MB<br>
+     	 CurrentSplitSize = <%= PerformanceAnalyzer.getCurrentSplitSize(conf)%><br>
+     	 <%=percentFileSmallerThanSplitSize%>% files are smaller than CurrentSplitSize<br>
+     	 <% } %>
+     	 AverageMapDuration = <%=StringUtils.formatTime(avgMapTime) %><br>
+     	 MapPhaseCompletionTime = <%=StringUtils.formatTime(mapPhaseTime) %><br>
+     	 MapIterations = <%=mapIterations %>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%
+	}else if(message.getProperty().equals("SortBuffer")){
+%>
+	<tr>
+     <td>Sort Buffer</td>
+     <td><%=conf.get("io.sort.mb")%></td>
+     <td><%= message.getMessage()%></td>
+     <td>Map Output Records=<%=mapOutputRecordsCounter.getValue() %><br>
+     	 Spilled Records=<%=spilledRecordsCounter.getValue()%><br>
+     	 Total Spills=<%=spillCounter.getValue()%>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%
+	}else if(message.getProperty().equals("SortFactor")){
+%>
+	<tr>
+     <td>Sort Factor</td>
+     <td><%=conf.get("io.sort.factor")%></td>
+     <td><%=message.getMessage() %></td>
+     <td>SortFactor = <%= conf.get("io.sort.factor")%><br>
+     	 AverageNumberOfSpillsPerMap = <%=averageNumberOfSpillsPerMap%><br>
+     	 TotalMapTasks = <%= conf.getNumMapTasks()%>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%
+	}else if(message.getProperty().equals("MapOutputCompression")){
+%>
+	<tr>
+     <td>Map Output Compression</td>
+     <td><%=conf.getCompressMapOutput()%></td>
+     <td><%=message.getMessage()%></td>
+     <td>&nbsp;</td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%
+	}else if(message.getProperty().equals("JvmOptions")){
+%>
+	<tr>
+     <td>JVM Options</td>
+     <td><%=conf.get("mapred.child.java.opts")%></td>
+     <td><%=message.getMessage()%></td>
+     <td>Average GC Time per Map = <%=gcMapCounter.getValue()/conf.getNumMapTasks()%>ms<br>
+     	 Average GC Time per Reduce = <%=gcReduceCounter.getValue()/conf.getNumReduceTasks()%>ms<br>
+     	 <%=PerformanceAnalyzer.getPercentGCTimeSpend(avgMapTime,gcMapCounter.getValue()/conf.getNumMapTasks()) %>% time spent in GC for each Map<br>
+     	 <%=PerformanceAnalyzer.getPercentGCTimeSpend(reducePhaseTime,gcReduceCounter.getValue()/conf.getNumReduceTasks()) %>% time spent in GC for each Reduce<br>
+     </td>
+     <td width="30" bgcolor="<%=message.getColorCode()%>"></td>
+    </tr>
+<%	
+	}
+}
+%>   
+ 
+    <tr>
+     <td>Combiner</td>
+     <td><%=conf.get("mapreduce.combine.class")%></td>
+     <td></td>
+     <td>Use Combiner for the best Performance</td>
+    </tr>
+    <tr>
+     <td>Reduce Partioning</td>
+     <td>&nbsp;</td>
+     <td>&nbsp;</td>
+     <td><%=reduceLoadFactor%>% of load is on <%=((float)reduceLoad/conf.getNumReduceTasks())* 100%>% reducers (<%=reduceLoad%> reducers) </td>
+    </tr>
+    <tr>
+     <td>Map Task ReExecution</td>
+     <td>&nbsp;</td>
+     <td>&nbsp;</td>
+     <td>Total Map Tasks <%= conf.getNumMapTasks()%><br>
+         Failed & Killed Map Tasks = <%=numFailedMaps + numKilledMaps %><br>
+         % Map Tasks Failed/Killed = <%=PerformanceAnalyzer.getPercentMapFailed(numFailedMaps + numKilledMaps, conf.getNumMapTasks()) %>
+     </td>
+    </tr>
+    <tr>
+     <td>Reduce Task ReExecution</td>
+     <td>&nbsp;</td>
+     <td>&nbsp;</td>
+     <td>Total Reduce Tasks <%= conf.getNumReduceTasks()%><br>
+         Failed & Killed Reduce Tasks = <%=numFailedReduces + numKilledReduces %><br>
+         % Reduce Tasks Failed/Killed = <%=PerformanceAnalyzer.getPercentReduceFailed(numFailedReduces + numKilledReduces, conf.getNumReduceTasks()) %>
+     </td>
+    </tr>
+    <tr>
+     <td>Map/Reduce Speculative Execution</td>
+     <td><%=conf.getMapSpeculativeExecution()%>/<%=conf.getReduceSpeculativeExecution()%></td>
+     <td>&nbsp;</td>
+     <td>&nbsp;</td>
+    </tr>
+   </tbody> 
+  </table>
+ </body>
+ </html>
Index: src/webapps/job/tasktrackerinfo.jsp
===================================================================
--- src/webapps/job/tasktrackerinfo.jsp	(revision 0)
+++ src/webapps/job/tasktrackerinfo.jsp	(revision 0)
@@ -0,0 +1,48 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+%>
+<% 
+JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+String jobId = request.getParameter("jobid"); 
+JobID jobIdObj = JobID.forName(jobId);
+JobInProgress job = (JobInProgress) tracker.getJob(jobIdObj);
+
+%>
+<html>
+ <head>
+  <meta http-equiv="refresh" content="5">
+ </head>
+ <body>
+ 	<table border="1">
+		<tr>
+			<th>Task Tracker</th>
+			<th>Total Memory(KB)</th>
+			<th>CPU Cores</th>
+			<th>CPU Idle Percentage</th>
+			<th>Free Memory(KB)</th>
+			<th>Cache Memory(KB)</th>
+			<th>Swap Memory Used(KB)</th>
+		</tr>
+ <%
+ for (Iterator<String> it = job.getCpuIdlePercentageMap().keySet().iterator(); it.hasNext();) {
+		String taskTrackername = it.next();
+%>
+		<tr>
+			<td><%= taskTrackername%></td>
+			<td><%= tracker.getTasktrackerTotalMemory().get(taskTrackername)%></td>
+			<td><%= tracker.getTasktrackerCPUCores().get(taskTrackername)%></td>
+			<td><%= job.getCpuIdlePercentageMap().get(taskTrackername).floatValue()%></td>
+			<td><%= job.getFreeMemoryMap().get(taskTrackername).floatValue()%></td>
+			<td><%= job.getCacheMemoryMap().get(taskTrackername).floatValue()%></td>
+			<td><%= job.getSwapMemoryMap().get(taskTrackername).floatValue()%></td>
+		</tr>
+	
+<%		
+	}
+ %>
+ 	</table>
+ </body>
+</html>
